#!/bin/bash
# File Name   : analysis_binlog.sh
# Author      : moshan
# Mail        : mo_shan@yeah.net
# Created Time: 2019-04-11 18:34:42
# Function    : Analysis binlog for MySQL
#########################################################################
work_dir='/data/git/analysis_binlog'
. ${work_dir}/function/logging.sh
mysqlbinlog="${1}"
binlog_file_path="${2}"
binlog_file="$(awk -F'/' '{print $NF}' <<< "${binlog_file_path}")"
sql_res_file="${3}"
save_type="${4}"
localhost_ip="${5}"
thread_num="${6}"
limit_sql_count="${7}"
res_dir="$(awk -F'/' 'OFS="/",NF-=1' <<< "${sql_res_file}")/table"
big_dir="${res_dir}/big"
[ ! -d "${res_dir}" -a "${save_type}x" != "filex" ] && mkdir -p ${res_dir} ${big_dir}
function f_analysis_binlog()
{
    eval ${mysqlbinlog} --base64-output=decode-rows -vv ${binlog_file_path} 2>/dev/null|awk -v res_dir="${res_dir}" -v binlog_file="${binlog_file}" -v save_type="${save_type}" -v limit_count="${limit_sql_count}" 'BEGIN {
        db_name="";
        t_name="";
        count=0;
        start=0;
        flag=0;
        s_count=0;
        sql_index=0;
        sql_count=0;
        begin_num=0;
    }
    {
        if(match($0, /^#.*server id.*Table_map:.*mapped to number/) && flag==0) 
        {
            split($(NF-4),a,"`");
            db_name=a[2];
            t_name=a[4];
            t_name_tmp=(db_name"."t_name);
            flag=1;
            t_time=(substr($1,2,6)" "$2);
            res_file=(res_dir"/"binlog_file"_"t_name_tmp".log")
            res_file_for_big=(res_dir"/big/"binlog_file"_"t_name_tmp".log")
            sql[sql_index]=("BEGIN\n/*start time:"t_time"*/");
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            begin_num=sql_index;
            sql_index+=1;
        }
        else if (match($0, /(### INSERT INTO .*\..*)/)) 
        {
            s_count+=1;
            sql[sql_index]=("INSERT INTO "t_name_tmp);
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            sql_index+=1;
            sql_count+=1;
        }
        else if (match($0, /(### UPDATE .*\..*)/)) 
        {
            s_count+=1;
            sql[sql_index]=("UPDATE "t_name_tmp);
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            sql_index+=1;
            sql_count+=1;
        }
        else if (match($0, /(### DELETE FROM .*\..*)/)) 
        {
            s_count+=1;
            sql[sql_index]=("DELETE FROM "t_name_tmp);
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            sql_index+=1;
            sql_count+=1;
        }
        else if (match($0, /^(# at) /) && flag==1 && s_count>0) 
        {
            s_count=0;
        }
        else if (match($0, /(#.*server id.*end_log_pos.*Xid =.*)/) && flag==1) 
        {
            sql[sql_index]=("/*stop time:"substr($1,2,6)" "$2"*/");
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            sql_index+=1;
        }
        else if (match($0, /^(COMMIT)/) && flag==1) 
        {
            flag=0;
            cont=0;
            sql[sql_index]="COMMIT\n";
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }


            if(sql_count >= limit_count)
            {
                tmp_len=length(sql);
                for (ti=begin_num; ti<=tmp_len; ti++)
                {
                    printf "%s\n", sql[ti] >> res_file_for_big;
                }
            }

            sql_count=0;

            sql_index+=1;
        }
        else if (match($0, /^(COMMIT)/)) 
        {
            count=0;
            flag=0;
        }
        else if (match($0, /(### )/))
        {
            split($0,tmp,"/*");
            split(tmp[1],tmp2,"###");
            sql[sql_index]=tmp2[2];
            if (save_type=="table" || save_type=="all")
            {
                printf "%s\n", sql[sql_index] >> res_file;
            }
            sql_index+=1;
        }
    } END {
        if (save_type!="table")
        {
            array_length=length(sql);
            for (i=0;i<=array_length;i++)
            {
                print sql[i];
            }
        }
    }' > ${sql_res_file} #2>/dev/null

    if [ $? -eq 0 ]
    then
        f_logging "INFO" "THREAD_${thread_num}:Analysis completed --> ${binlog_file_path}" "2"|tee -a ${log_file}
    else
        f_logging "ERROR" "THREAD_${thread_num}:Analysis completed --> ${binlog_file_path}. The binlog file may be an empty transaction file."|tee -a ${log_file} 
    fi
    [ -f "${work_dir}/pid/${binlog_file}.pid" ] && rm -f ${work_dir}/pid/${binlog_file}.pid
    [ -f "${sql_res_file}" -a "${save_type}x" == "tablex" ] && rm -f "${sql_res_file}"
    [ -f "${work_dir}/thread/${thread_num}" ] && rm -f ${work_dir}/thread/${thread_num}
}
f_analysis_binlog "${1}" "${2}" "${3}" "${4}"
